/*-------------------------------------------------------------------------
 *
 * slot.c
 *       Replication slot management.
 *
 *
 * Copyright (c) 2012-2017, PostgreSQL Global Development Group
 *
 * This source code file contains modifications made by THL A29 Limited ("Tencent Modifications").
 * All Tencent Modifications are Copyright (C) 2023 THL A29 Limited.
 *
 *
 * IDENTIFICATION
 *      src/backend/replication/slot.c
 *
 * NOTES
 *
 * Replication slots are used to keep state about replication streams
 * originating from this cluster.  Their primary purpose is to prevent the
 * premature removal of WAL or of old tuple versions in a manner that would
 * interfere with replication; they are also useful for monitoring purposes.
 * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
 * on standbys (to support cascading setups).  The requirement that slots be
 * usable on standbys precludes storing them in the system catalogs.
 *
 * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
 * directory. Inside that directory the state file will contain the slot's
 * own data. Additional data can be stored alongside that file if required.
 * While the server is running, the state data is also cached in memory for
 * efficiency.
 *
 * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
 * or free a slot. ReplicationSlotControlLock must be taken in shared mode
 * to iterate over the slots, and in exclusive mode to change the in_use flag
 * of a slot.  The remaining data in each slot is protected by its mutex.
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include <unistd.h>
#include <sys/stat.h>

#include "access/transam.h"
#include "access/xlog_internal.h"
#include "common/string.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/slot.h"
#include "storage/fd.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#ifdef _PUB_SUB_RELIABLE_
#include "access/replslotdesc.h"
#endif
#ifdef __STORAGE_SCALABLE__
#include "replication/logical_statistic.h"
#include "fmgr.h"
#endif


/*
 * Replication slot on-disk data structure.
 */
typedef struct ReplicationSlotOnDisk
{
    /* first part of this struct needs to be version independent */

    /* data not covered by checksum */
    uint32        magic;
    pg_crc32c    checksum;

    /* data covered by checksum */
    uint32        version;
    uint32        length;

    /*
     * The actual data in the slot that follows can differ based on the above
     * 'version'.
     */

    ReplicationSlotPersistentData slotdata;
} ReplicationSlotOnDisk;

/* size of version independent data */
#define ReplicationSlotOnDiskConstantSize \
    offsetof(ReplicationSlotOnDisk, slotdata)
/* size of the part of the slot not covered by the checksum */
#define SnapBuildOnDiskNotChecksummedSize \
    offsetof(ReplicationSlotOnDisk, version)
/* size of the part covered by the checksum */
#define SnapBuildOnDiskChecksummedSize \
    sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
/* size of the slot data that is version dependent */
#define ReplicationSlotOnDiskV2Size \
    sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize

#define SLOT_MAGIC        0x1051CA1    /* format identifier */
#define SLOT_VERSION    2        /* version for new files */
#ifdef _PUB_SUB_RELIABLE_
#define REPLICATION_SLOT_INVALID_ID -1
#endif

/* Control array for replication slot management */
ReplicationSlotCtlData *ReplicationSlotCtl = NULL;

/* My backend's replication slot in the shared memory array */
ReplicationSlot *MyReplicationSlot = NULL;

/* GUCs */
int            max_replication_slots = 0;    /* the maximum number of replication
                                         * slots */

static void ReplicationSlotDropAcquired(void);
static void ReplicationSlotDropPtr(ReplicationSlot *slot);

static void ReplicationSlotModifyAcquired(const char *new_name);
static void ReplicationSlotRenamePtr(ReplicationSlot *slot, const char * new_name);

/* internal persistency functions */
static void RestoreSlotFromDisk(const char *name);
static void CreateSlotOnDisk(ReplicationSlot *slot);
static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);

#ifdef _PUB_SUB_RELIABLE_
extern bool g_replication_slot_debug;
static void replication_slot_redo_create(xl_replication_slot_create * xlrec);
static void replication_slot_redo_drop(xl_replication_slot_drop * xlrec);
static void replication_slot_redo_rename(xl_replication_slot_rename * xlrec);
static void replication_slot_redo_lsn_update(xl_replication_slot_lsn_replica * xlrec);
#endif

/*
 * Report shared-memory space needed by ReplicationSlotShmemInit.
 */
Size
ReplicationSlotsShmemSize(void)
{
    Size        size = 0;

    if (max_replication_slots == 0)
        return size;

    size = offsetof(ReplicationSlotCtlData, replication_slots);
    size = add_size(size,
                    mul_size(max_replication_slots, sizeof(ReplicationSlot)));

    return size;
}

/*
 * Allocate and initialize walsender-related shared memory.
 */
void
ReplicationSlotsShmemInit(void)
{
    bool        found;

    if (max_replication_slots == 0)
        return;

    ReplicationSlotCtl = (ReplicationSlotCtlData *)
        ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
                        &found);

    LWLockRegisterTranche(LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS,
                          "replication_slot_io");

    if (!found)
    {
        int            i;

        /* First time through, so initialize */
        MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());

        for (i = 0; i < max_replication_slots; i++)
        {
            ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
#ifdef _PUB_SUB_RELIABLE_
            slot->data.slotid    = REPLICATION_SLOT_INVALID_ID;
#endif
            /* everything else is zeroed by the memset above */
            SpinLockInit(&slot->mutex);
            LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS);
            ConditionVariableInit(&slot->active_cv);
        }
    }
}

/*
 * Check whether the passed slot name is valid and report errors at elevel.
 *
 * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
 * the name to be used as a directory name on every supported OS.
 *
 * Returns whether the directory name is valid or not if elevel < ERROR.
 */
bool
ReplicationSlotValidateName(const char *name, int elevel)
{// #lizard forgives
    const char *cp;

    if (strlen(name) == 0)
    {
        ereport(elevel,
                (errcode(ERRCODE_INVALID_NAME),
                 errmsg("replication slot name \"%s\" is too short",
                        name)));
        return false;
    }

    if (strlen(name) >= NAMEDATALEN)
    {
        ereport(elevel,
                (errcode(ERRCODE_NAME_TOO_LONG),
                 errmsg("replication slot name \"%s\" is too long",
                        name)));
        return false;
    }

    for (cp = name; *cp; cp++)
    {
        if (!((*cp >= 'a' && *cp <= 'z')
              || (*cp >= '0' && *cp <= '9')
              || (*cp == '_')))
        {
            ereport(elevel,
                    (errcode(ERRCODE_INVALID_NAME),
                     errmsg("replication slot name \"%s\" contains invalid character",
                            name),
                     errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
            return false;
        }
    }
    return true;
}

/*
 * Create a new replication slot and mark it as used by this backend.
 *
 * name: Name of the slot
 * db_specific: logical decoding is db specific; if the slot is going to
 *       be used for that pass true, otherwise false.
 */
void
ReplicationSlotCreate(const char *name, bool db_specific,
                      ReplicationSlotPersistency persistency)
{// #lizard forgives
    ReplicationSlot *slot = NULL;
    int            i;
    int         slotid;

    Assert(MyReplicationSlot == NULL);

    ReplicationSlotValidateName(name, ERROR);

    /*
     * If some other backend ran this code concurrently with us, we'd likely
     * both allocate the same slot, and that would be bad.  We'd also be at
     * risk of missing a name collision.  Also, we don't want to try to create
     * a new slot while somebody's busy cleaning up an old one, because we
     * might both be monkeying with the same directory.
     */
    LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);

    /*
     * Check for name collision, and identify an allocatable slot.  We need to
     * hold ReplicationSlotControlLock in shared mode for this, so that nobody
     * else can change the in_use flags while we're looking at them.
     */
    LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    for (i = 0; i < max_replication_slots; i++)
    {
        ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];

        if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
            ereport(ERROR,
                    (errcode(ERRCODE_DUPLICATE_OBJECT),
                     errmsg("replication slot \"%s\" already exists", name)));
        if (!s->in_use && slot == NULL)
        {
            slotid = i;
            slot = s;
        }
    }
    LWLockRelease(ReplicationSlotControlLock);

    /* If all slots are in use, we're out of luck. */
    if (slot == NULL)
        ereport(ERROR,
                (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
                 errmsg("all replication slots are in use"),
                 errhint("Free one or increase max_replication_slots.")));

    /*
     * Since this slot is not in use, nobody should be looking at any part of
     * it other than the in_use field unless they're trying to allocate it.
     * And since we hold ReplicationSlotAllocationLock, nobody except us can
     * be doing that.  So it's safe to initialize the slot.
     */
    Assert(!slot->in_use);
    Assert(slot->active_pid == 0);

    /* first initialize persistent data */
    memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
    StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
    slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
    slot->data.persistency = persistency;

    /* and then data only present in shared memory */
    slot->just_dirtied = false;
    slot->dirty = false;
    slot->effective_xmin = InvalidTransactionId;
    slot->effective_catalog_xmin = InvalidTransactionId;
    slot->candidate_catalog_xmin = InvalidTransactionId;
    slot->candidate_xmin_lsn = InvalidXLogRecPtr;
    slot->candidate_restart_valid = InvalidXLogRecPtr;
    slot->candidate_restart_lsn = InvalidXLogRecPtr;
#ifdef __STORAGE_SCALABLE__
    slot->pgoutput = false;
    slot->subid = InvalidOid;
    slot->relid = InvalidOid;
    slot->shards = NULL;
    slot->npubs = 0;
    slot->alltables = NULL;
    slot->pubshards = NULL;
    slot->tables = NULL;
    slot->ntups_insert = 0;
    slot->ntups_delete = 0;
    slot->checksum_insert = 0;
    slot->checksum_delete = 0;
#endif

#ifdef _PUB_SUB_RELIABLE_
    slot->data.slotid = slotid;
#endif
    

    /*
     * Create the slot on disk.  We haven't actually marked the slot allocated
     * yet, so no special cleanup is required if this errors out.
     */
    CreateSlotOnDisk(slot);

    /*
     * We need to briefly prevent any other backend from iterating over the
     * slots while we flip the in_use flag. We also need to set the active
     * flag while holding the ControlLock as otherwise a concurrent
     * SlotAcquire() could acquire the slot as well.
     */
    LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);

    slot->in_use = true;

    /* We can now mark the slot active, and that makes it our slot. */
    SpinLockAcquire(&slot->mutex);
    Assert(slot->active_pid == 0);
    slot->active_pid = MyProcPid;
    SpinLockRelease(&slot->mutex);
    MyReplicationSlot = slot;

    LWLockRelease(ReplicationSlotControlLock);

    /*
     * Now that the slot has been marked as in_use and active, it's safe to
     * let somebody else try to allocate a slot.
     */
    LWLockRelease(ReplicationSlotAllocationLock);

#ifdef _PUB_SUB_RELIABLE_
    /* treate REPLICATION_KIND_LOGICAL first for picc, REPLICATION_KIND_PHYSICAL later */
    replica_slot_wal_record_creat_slot();
#endif

    /* Let everybody know we've modified this slot */
    ConditionVariableBroadcast(&slot->active_cv);
}

/*
 * Find a previously created slot and mark it as used by this backend.
 */
void
ReplicationSlotAcquire(const char *name, bool nowait)
{
    ReplicationSlot *slot;
    int            active_pid;
    int            i;

retry:
    Assert(MyReplicationSlot == NULL);

    /*
     * Search for the named slot and mark it active if we find it.  If the
     * slot is already active, we exit the loop with active_pid set to the PID
     * of the backend that owns it.
     */
    active_pid = 0;
    slot = NULL;
    LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    for (i = 0; i < max_replication_slots; i++)
    {
        ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];

        if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
        {
            /*
             * This is the slot we want.  We don't know yet if it's active, so
             * get ready to sleep on it in case it is.  (We may end up not
             * sleeping, but we don't want to do this while holding the
             * spinlock.)
             */
            ConditionVariablePrepareToSleep(&s->active_cv);

            SpinLockAcquire(&s->mutex);

            active_pid = s->active_pid;
            if (active_pid == 0)
                active_pid = s->active_pid = MyProcPid;

            SpinLockRelease(&s->mutex);
            slot = s;

            break;
        }
    }
    LWLockRelease(ReplicationSlotControlLock);

    /* If we did not find the slot, error out. */
    if (slot == NULL)
        ereport(ERROR,
                (errcode(ERRCODE_UNDEFINED_OBJECT),
                 errmsg("replication slot \"%s\" does not exist", name)));

    /*
     * If we found the slot but it's already active in another backend, we
     * either error out or retry after a short wait, as caller specified.
     */
    if (active_pid != MyProcPid)
    {
        if (nowait)
            ereport(ERROR,
                    (errcode(ERRCODE_OBJECT_IN_USE),
                     errmsg("replication slot \"%s\" is active for PID %d",
                            name, active_pid)));

        /* Wait here until we get signaled, and then restart */
        ConditionVariableSleep(&slot->active_cv,
                               WAIT_EVENT_REPLICATION_SLOT_DROP);
        ConditionVariableCancelSleep();
        goto retry;
    }
    else
        ConditionVariableCancelSleep(); /* no sleep needed after all */

    /* Let everybody know we've modified this slot */
    ConditionVariableBroadcast(&slot->active_cv);

    /* We made this slot active, so it's ours now. */
    MyReplicationSlot = slot;
}

/*
 * Release the replication slot that this backend considers to own.
 *
 * This or another backend can re-acquire the slot later.
 * Resources this slot requires will be preserved.
 */
void
ReplicationSlotRelease(void)
{// #lizard forgives
    ReplicationSlot *slot = MyReplicationSlot;

    Assert(slot != NULL && slot->active_pid != 0);

    if (slot->data.persistency == RS_EPHEMERAL)
    {
        /*
         * Delete the slot. There is no !PANIC case where this is allowed to
         * fail, all that may happen is an incomplete cleanup of the on-disk
         * data.
         */
        ReplicationSlotDropAcquired();
    }

    /*
     * If slot needed to temporarily restrain both data and catalog xmin to
     * create the catalog snapshot, remove that temporary constraint.
     * Snapshots can only be exported while the initial snapshot is still
     * acquired.
     */
    if (!TransactionIdIsValid(slot->data.xmin) &&
        TransactionIdIsValid(slot->effective_xmin))
    {
        SpinLockAcquire(&slot->mutex);
        slot->effective_xmin = InvalidTransactionId;
        SpinLockRelease(&slot->mutex);
        ReplicationSlotsComputeRequiredXmin(false);
    }

    if (slot->data.persistency == RS_PERSISTENT)
    {
        /*
         * Mark persistent slot inactive.  We're not freeing it, just
         * disconnecting, but wake up others that may be waiting for it.
         */
        SpinLockAcquire(&slot->mutex);
        slot->active_pid = 0;
        SpinLockRelease(&slot->mutex);
        ConditionVariableBroadcast(&slot->active_cv);
    }

#ifdef __STORAGE_SCALABLE__
    if (MyReplicationSlot && !(MyReplicationSlot->pgoutput && OidIsValid(MyReplicationSlot->subid)
        && OidIsValid(MyReplicationSlot->relid)))
    {
        MyReplicationSlot = NULL;
    }
#else
    MyReplicationSlot = NULL;
#endif
    /* might not have been set when we've been a plain slot */
    LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
    MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING;
    LWLockRelease(ProcArrayLock);
}

/*
 * Cleanup all temporary slots created in current session.
 */
void
ReplicationSlotCleanup(void)
{
    int            i;

    Assert(MyReplicationSlot == NULL);

restart:
    LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    for (i = 0; i < max_replication_slots; i++)
    {
        ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];

        if (!s->in_use)
            continue;

        SpinLockAcquire(&s->mutex);
        if (s->active_pid == MyProcPid)
        {
            Assert(s->data.persistency == RS_TEMPORARY);
            SpinLockRelease(&s->mutex);
            LWLockRelease(ReplicationSlotControlLock);    /* avoid deadlock */

            ReplicationSlotDropPtr(s);

            ConditionVariableBroadcast(&s->active_cv);
            goto restart;
        }
        else
            SpinLockRelease(&s->mutex);
    }

    LWLockRelease(ReplicationSlotControlLock);
}

/*
 * Permanently drop replication slot identified by the passed in name.
 */
void
ReplicationSlotDrop(const char *name, bool nowait)
{
    Assert(MyReplicationSlot == NULL);

    ReplicationSlotAcquire(name, nowait);

    ReplicationSlotDropAcquired();
}

/*
 * Permanently drop the currently acquired replication slot.
 */
static void
ReplicationSlotDropAcquired(void)
{
    ReplicationSlot *slot = MyReplicationSlot;

    Assert(MyReplicationSlot != NULL);

#ifdef _PUB_SUB_RELIABLE_
    replication_slot_wal_record_drop();
#endif

    /* slot isn't acquired anymore */
    MyReplicationSlot = NULL;

    ReplicationSlotDropPtr(slot);
}

/*
 * Permanently drop the replication slot which will be released by the point
 * this function returns.
 */
static void
ReplicationSlotDropPtr(ReplicationSlot *slot)
{
    char        path[MAXPGPATH];
    char        tmppath[MAXPGPATH];

    /*
     * If some other backend ran this code concurrently with us, we might try
     * to delete a slot with a certain name while someone else was trying to
     * create a slot with the same name.
     */
    LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);

    /* Generate pathnames. */
    sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
    sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));

    /*
     * Rename the slot directory on disk, so that we'll no longer recognize
     * this as a valid slot.  Note that if this fails, we've got to mark the
     * slot inactive before bailing out.  If we're dropping an ephemeral or a
     * temporary slot, we better never fail hard as the caller won't expect
     * the slot to survive and this might get called during error handling.
     */
    if (rename(path, tmppath) == 0)
    {
        /*
         * We need to fsync() the directory we just renamed and its parent to
         * make sure that our changes are on disk in a crash-safe fashion.  If
         * fsync() fails, we can't be sure whether the changes are on disk or
         * not.  For now, we handle that by panicking;
         * StartupReplicationSlots() will try to straighten it out after
         * restart.
         */
        START_CRIT_SECTION();
        fsync_fname(tmppath, true);
        fsync_fname("pg_replslot", true);
        END_CRIT_SECTION();
    }
    else
    {
        bool        fail_softly = slot->data.persistency != RS_PERSISTENT;

        SpinLockAcquire(&slot->mutex);
        slot->active_pid = 0;
        SpinLockRelease(&slot->mutex);

        /* wake up anyone waiting on this slot */
        ConditionVariableBroadcast(&slot->active_cv);

        ereport(fail_softly ? WARNING : ERROR,
                (errcode_for_file_access(),
                 errmsg("could not rename file \"%s\" to \"%s\": %m",
                        path, tmppath)));
    }

#ifdef __STORAGE_SCALABLE__
    if (OidIsValid(slot->subid) && slot->data.persistency == RS_PERSISTENT)
    {
        DirectFunctionCall1Coll(opentenbase_remove_pubtable_stat, InvalidOid,
                                UInt32GetDatum(slot->subid));
        DirectFunctionCall1Coll(opentenbase_remove_pub_stat, InvalidOid,
                                PointerGetDatum(cstring_to_text(slot->subname.data)));
    }
#endif

    /*
     * The slot is definitely gone.  Lock out concurrent scans of the array
     * long enough to kill it.  It's OK to clear the active PID here without
     * grabbing the mutex because nobody else can be scanning the array here,
     * and nobody can be attached to this slot and thus access it without
     * scanning the array.
     *
     * Also wake up processes waiting for it.
     */
    LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
    slot->active_pid = 0;
    slot->in_use = false;
    LWLockRelease(ReplicationSlotControlLock);
    ConditionVariableBroadcast(&slot->active_cv);

    /*
     * Slot is dead and doesn't prevent resource removal anymore, recompute
     * limits.
     */
    ReplicationSlotsComputeRequiredXmin(false);
    ReplicationSlotsComputeRequiredLSN();

    /*
     * If removing the directory fails, the worst thing that will happen is
     * that the user won't be able to create a new slot with the same name
     * until the next server restart.  We warn about it, but that's all.
     */
    if (!rmtree(tmppath, true))
        ereport(WARNING,
                (errcode_for_file_access(),
                 errmsg("could not remove directory \"%s\"", tmppath)));

    /*
     * We release this at the very end, so that nobody starts trying to create
     * a slot while we're still cleaning up the detritus of the old one.
     */
    LWLockRelease(ReplicationSlotAllocationLock);
}

/*
 * Permanently rename replication slot to new_name identified by the passed in old_name.
 */
void
ReplicationSlotModify(const char *old_name, const char *new_name, bool nowait)
{
    Assert(MyReplicationSlot == NULL);

    ReplicationSlotAcquire(old_name, nowait);

    ReplicationSlotModifyAcquired(new_name);

    return;
}

/*
 * Permanently rename the currently acquired replication slot.
 */
static void
ReplicationSlotModifyAcquired(const char *new_name)
{
    ReplicationSlot *slot = MyReplicationSlot;

    Assert(MyReplicationSlot != NULL);

#ifdef _PUB_SUB_RELIABLE_
    replication_slot_wal_record_rename_slot(new_name);
#endif

    ReplicationSlotRenamePtr(slot, new_name);

    return;
}

/*
 * Permanently rename the replication slot which will be released by the point
 * this function returns.
 */
static void
ReplicationSlotRenamePtr(ReplicationSlot *slot, const char * new_name)
{
    int	    i;
    char    old_path[MAXPGPATH];
    char    new_path[MAXPGPATH];

    /* If the slot is null, we're out of luck. */
    Assert(slot == NULL);
    if (slot == NULL)
        elog(ERROR, "cannot perform rename slot name without an acquired slot");

    ReplicationSlotValidateName(new_name, ERROR);

    /* Generate pathnames. */
    sprintf(old_path, "pg_replslot/%s", NameStr(slot->data.name));
    sprintf(new_path, "pg_replslot/%s", new_name);

    /*
     * If some other backend ran this code concurrently with us, we'd likely
     * both rename the same slot, and that would be bad.  We'd also be at
     * risk of missing a name collision.  Also, we don't want to try to rename
     * a new slot while somebody's busy cleaning up an old one, because we
     * might both be monkeying with the same directory.
     */
    LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);

    /*
     * Check for name collision, and identify an allocatable slot.  We need to
     * hold ReplicationSlotControlLock in shared mode for this, so that nobody
     * else can change the in_use flags while we're looking at them.
     */
    LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    for (i = 0; i < max_replication_slots; i++)
    {
        ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];

        if (s->in_use && strcmp(new_name, NameStr(s->data.name)) == 0)
            ereport(ERROR,
                    (errcode(ERRCODE_DUPLICATE_OBJECT),
                            errmsg("replication slot \"%s\" already exists", new_name)));
    }
    LWLockRelease(ReplicationSlotControlLock);

    /* first initialize persistent data */
    StrNCpy(NameStr(slot->data.name), new_name, NAMEDATALEN);

    /* Rename the directory into place. */
    if (rename(old_path, new_path) != 0)
        ereport(ERROR,
                (errcode_for_file_access(),
                        errmsg("Fail to rename file \"%s\" to \"%s\": %m",
                               old_path, new_path)));

    /*
     * If we'd now fail - really unlikely - we wouldn't know whether this slot
     * would persist after an OS crash or not - so, force a restart. The
     * restart would try to fsync this again till it works.
     */
    START_CRIT_SECTION();
    fsync_fname(new_path, true);
    fsync_fname("pg_replslot", true);
    END_CRIT_SECTION();

    CreateSlotOnDisk(slot);

    /* Let everybody know we've modified this slot */
    ConditionVariableBroadcast(&slot->active_cv);

    /*
     * Now that the slot has been renamed, it's safe to
     * let somebody else try to allocate or rename a slot.
     */
    LWLockRelease(ReplicationSlotAllocationLock);

    return;
}

/*
 * Serialize the currently acquired slot's state from memory to disk, thereby
 * guaranteeing the current state will survive a crash.
 */
void
ReplicationSlotSave(void)
{
    char        path[MAXPGPATH];

    Assert(MyReplicationSlot != NULL);

    sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
    SaveSlotToPath(MyReplicationSlot, path, ERROR);
}

/*
 * Signal that it would be useful if the currently acquired slot would be
 * flushed out to disk.
 *
 * Note that the actual flush to disk can be delayed for a long time, if
 * required for correctness explicitly do a ReplicationSlotSave().
 */
void
ReplicationSlotMarkDirty(void)
{
    ReplicationSlot *slot = MyReplicationSlot;

    Assert(MyReplicationSlot != NULL);

    SpinLockAcquire(&slot->mutex);
    MyReplicationSlot->just_dirtied = true;
    MyReplicationSlot->dirty = true;
    SpinLockRelease(&slot->mutex);
}

/*
 * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
 * guaranteeing it will be there after an eventual crash.
 */
void
ReplicationSlotPersist(void)
{
    ReplicationSlot *slot = MyReplicationSlot;

    Assert(slot != NULL);
    Assert(slot->data.persistency != RS_PERSISTENT);

    SpinLockAcquire(&slot->mutex);
    slot->data.persistency = RS_PERSISTENT;
    SpinLockRelease(&slot->mutex);

    ReplicationSlotMarkDirty();
    ReplicationSlotSave();

#ifdef __OPENTENBASE__
    replica_slot_wal_record_creat_slot();
#endif
}

/*
 * Compute the oldest xmin across all slots and store it in the ProcArray.
 *
 * If already_locked is true, ProcArrayLock has already been acquired
 * exclusively.
 */
void
ReplicationSlotsComputeRequiredXmin(bool already_locked)
{// #lizard forgives
    int            i;
    TransactionId agg_xmin = InvalidTransactionId;
    TransactionId agg_catalog_xmin = InvalidTransactionId;

    Assert(ReplicationSlotCtl != NULL);

    LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);

    for (i = 0; i < max_replication_slots; i++)
    {
        ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
        TransactionId effective_xmin;
        TransactionId effective_catalog_xmin;

        if (!s->in_use)
            continue;

        SpinLockAcquire(&s->mutex);
        effective_xmin = s->effective_xmin;
        effective_catalog_xmin = s->effective_catalog_xmin;
        SpinLockRelease(&s->mutex);

        /* check the data xmin */
        if (TransactionIdIsValid(effective_xmin) &&
            (!TransactionIdIsValid(agg_xmin) ||
             TransactionIdPrecedes(effective_xmin, agg_xmin)))
            agg_xmin = effective_xmin;

        /* check the catalog xmin */
        if (TransactionIdIsValid(effective_catalog_xmin) &&
            (!TransactionIdIsValid(agg_catalog_xmin) ||
             TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
            agg_catalog_xmin = effective_catalog_xmin;
    }

    LWLockRelease(ReplicationSlotControlLock);

    ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
}

/*
 * Compute the oldest restart LSN across all slots and inform xlog module.
 */
void
ReplicationSlotsComputeRequiredLSN(void)
{
    int            i;
    XLogRecPtr    min_required = InvalidXLogRecPtr;

    Assert(ReplicationSlotCtl != NULL);

    LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    for (i = 0; i < max_replication_slots; i++)
    {
        ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
        XLogRecPtr    restart_lsn;

        if (!s->in_use)
            continue;

        SpinLockAcquire(&s->mutex);
        restart_lsn = s->data.restart_lsn;
        SpinLockRelease(&s->mutex);

        if (restart_lsn != InvalidXLogRecPtr &&
            (min_required == InvalidXLogRecPtr ||
             restart_lsn < min_required))
            min_required = restart_lsn;
    }
    LWLockRelease(ReplicationSlotControlLock);

    XLogSetReplicationSlotMinimumLSN(min_required);
}

/*
 * Compute the oldest WAL LSN required by *logical* decoding slots..
 *
 * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
 * slots exist.
 *
 * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
 * ignores physical replication slots.
 *
 * The results aren't required frequently, so we don't maintain a precomputed
 * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
 */
XLogRecPtr
ReplicaSlotsComputLogicalRstartLSN(void)
{
    XLogRecPtr    result = InvalidXLogRecPtr;
    int            i;

    if (max_replication_slots <= 0)
        return InvalidXLogRecPtr;

    LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);

    for (i = 0; i < max_replication_slots; i++)
    {
        ReplicationSlot *s;
        XLogRecPtr    restart_lsn;

        s = &ReplicationSlotCtl->replication_slots[i];

        /* cannot change while ReplicationSlotCtlLock is held */
        if (!s->in_use)
            continue;

        /* we're only interested in logical slots */
        if (!SlotIsLogical(s))
            continue;

        /* read once, it's ok if it increases while we're checking */
        SpinLockAcquire(&s->mutex);
        restart_lsn = s->data.restart_lsn;
        SpinLockRelease(&s->mutex);

        if (result == InvalidXLogRecPtr ||
            restart_lsn < result)
            result = restart_lsn;
    }

    LWLockRelease(ReplicationSlotControlLock);

    return result;
}

/*
 * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
 * passed database oid.
 *
 * Returns true if there are any slots referencing the database. *nslots will
 * be set to the absolute number of slots in the database, *nactive to ones
 * currently active.
 */
bool
ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
{
    int            i;

    *nslots = *nactive = 0;

    if (max_replication_slots <= 0)
        return false;

    LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    for (i = 0; i < max_replication_slots; i++)
    {
        ReplicationSlot *s;

        s = &ReplicationSlotCtl->replication_slots[i];

        /* cannot change while ReplicationSlotCtlLock is held */
        if (!s->in_use)
            continue;

        /* only logical slots are database specific, skip */
        if (!SlotIsLogical(s))
            continue;

        /* not our database, skip */
        if (s->data.database != dboid)
            continue;

        /* count slots with spinlock held */
        SpinLockAcquire(&s->mutex);
        (*nslots)++;
        if (s->active_pid != 0)
            (*nactive)++;
        SpinLockRelease(&s->mutex);
    }
    LWLockRelease(ReplicationSlotControlLock);

    if (*nslots > 0)
        return true;
    return false;
}

/*
 * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
 * passed database oid. The caller should hold an exclusive lock on the
 * pg_database oid for the database to prevent creation of new slots on the db
 * or replay from existing slots.
 *
 * Another session that concurrently acquires an existing slot on the target DB
 * (most likely to drop it) may cause this function to ERROR. If that happens
 * it may have dropped some but not all slots.
 *
 * This routine isn't as efficient as it could be - but we don't drop
 * databases often, especially databases with lots of slots.
 */
void
ReplicationSlotsDropDBSlots(Oid dboid)
{
    int            i;

    if (max_replication_slots <= 0)
        return;

restart:
    LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    for (i = 0; i < max_replication_slots; i++)
    {
        ReplicationSlot *s;
        char       *slotname;
        int            active_pid;

        s = &ReplicationSlotCtl->replication_slots[i];

        /* cannot change while ReplicationSlotCtlLock is held */
        if (!s->in_use)
            continue;

        /* only logical slots are database specific, skip */
        if (!SlotIsLogical(s))
            continue;

        /* not our database, skip */
        if (s->data.database != dboid)
            continue;

        /* acquire slot, so ReplicationSlotDropAcquired can be reused  */
        SpinLockAcquire(&s->mutex);
        /* can't change while ReplicationSlotControlLock is held */
        slotname = NameStr(s->data.name);
        active_pid = s->active_pid;
        if (active_pid == 0)
        {
            MyReplicationSlot = s;
            s->active_pid = MyProcPid;
        }
        SpinLockRelease(&s->mutex);

        /*
         * Even though we hold an exclusive lock on the database object a
         * logical slot for that DB can still be active, e.g. if it's
         * concurrently being dropped by a backend connected to another DB.
         *
         * That's fairly unlikely in practice, so we'll just bail out.
         */
        if (active_pid)
            ereport(ERROR,
                    (errcode(ERRCODE_OBJECT_IN_USE),
                     errmsg("replication slot \"%s\" is active for PID %d",
                            slotname, active_pid)));

        /*
         * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
         * holding ReplicationSlotControlLock over filesystem operations,
         * release ReplicationSlotControlLock and use
         * ReplicationSlotDropAcquired.
         *
         * As that means the set of slots could change, restart scan from the
         * beginning each time we release the lock.
         */
        LWLockRelease(ReplicationSlotControlLock);
        ReplicationSlotDropAcquired();
        goto restart;
    }
    LWLockRelease(ReplicationSlotControlLock);
}


/*
 * Check whether the server's configuration supports using replication
 * slots.
 */
void
CheckSlotRequirements(void)
{
    if (max_replication_slots == 0)
        ereport(ERROR,
                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                 (errmsg("replication slots can only be used if max_replication_slots > 0"))));

    if (wal_level < WAL_LEVEL_REPLICA)
        ereport(ERROR,
                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                 errmsg("replication slots can only be used if wal_level >= replica")));
}

/*
 * Reserve WAL for the currently active slot.
 *
 * Compute and set restart_lsn in a manner that's appropriate for the type of
 * the slot and concurrency safe.
 */
void
ReplicationSlotReserveWal(void)
{
    ReplicationSlot *slot = MyReplicationSlot;

    Assert(slot != NULL);
    Assert(slot->data.restart_lsn == InvalidXLogRecPtr);

    /*
     * The replication slot mechanism is used to prevent removal of required
     * WAL. As there is no interlock between this routine and checkpoints, WAL
     * segments could concurrently be removed when a now stale return value of
     * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
     * this happens we'll just retry.
     */
    while (true)
    {
        XLogSegNo    segno;

        /*
         * For logical slots log a standby snapshot and start logical decoding
         * at exactly that position. That allows the slot to start up more
         * quickly.
         *
         * That's not needed (or indeed helpful) for physical slots as they'll
         * start replay at the last logged checkpoint anyway. Instead return
         * the location of the last redo LSN. While that slightly increases
         * the chance that we have to retry, it's where a base backup has to
         * start replay at.
         */
        if (!RecoveryInProgress() && SlotIsLogical(slot))
        {
            XLogRecPtr    flushptr;

            /* start at current insert position */
            slot->data.restart_lsn = GetXLogInsertRecPtr();

            /* make sure we have enough information to start */
            flushptr = LogStandbySnapshot();

            /* and make sure it's fsynced to disk */
            XLogFlush(flushptr);
        }
        else
        {
            slot->data.restart_lsn = GetRedoRecPtr();
        }

        /* prevent WAL removal as fast as possible */
        ReplicationSlotsComputeRequiredLSN();

        /*
         * If all required WAL is still there, great, otherwise retry. The
         * slot should prevent further removal of WAL, unless there's a
         * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
         * the new restart_lsn above, so normally we should never need to loop
         * more than twice.
         */
        XLByteToSeg(slot->data.restart_lsn, segno);
        if (XLogGetLastRemovedSegno() < segno)
            break;
    }
}

/*
 * Flush all replication slots to disk.
 *
 * This needn't actually be part of a checkpoint, but it's a convenient
 * location.
 */
void
CheckPointReplicationSlots(void)
{
    int            i;

    elog(DEBUG1, "performing replication slot checkpoint");

    /*
     * Prevent any slot from being created/dropped while we're active. As we
     * explicitly do *not* want to block iterating over replication_slots or
     * acquiring a slot we cannot take the control lock - but that's OK,
     * because holding ReplicationSlotAllocationLock is strictly stronger, and
     * enough to guarantee that nobody can change the in_use bits on us.
     */
    LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);

    for (i = 0; i < max_replication_slots; i++)
    {
        ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
        char        path[MAXPGPATH];

        if (!s->in_use)
            continue;

        /* save the slot to disk, locking is handled in SaveSlotToPath() */
        sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
        SaveSlotToPath(s, path, LOG);
    }
    LWLockRelease(ReplicationSlotAllocationLock);
}

/*
 * Load all replication slots from disk into memory at server startup. This
 * needs to be run before we start crash recovery.
 */
void
StartupReplicationSlots(void)
{// #lizard forgives
    DIR           *replication_dir;
    struct dirent *replication_de;

    elog(DEBUG1, "starting up replication slots");

    /* restore all slots by iterating over all on-disk entries */
    replication_dir = AllocateDir("pg_replslot");
    while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
    {
        struct stat statbuf;
        char        path[MAXPGPATH + 12];

        if (strcmp(replication_de->d_name, ".") == 0 ||
            strcmp(replication_de->d_name, "..") == 0)
            continue;

        snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);

        /* we're only creating directories here, skip if it's not our's */
        if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
            continue;

        /* we crashed while a slot was being setup or deleted, clean up */
        if (pg_str_endswith(replication_de->d_name, ".tmp"))
        {
            if (!rmtree(path, true))
            {
                ereport(WARNING,
                        (errcode_for_file_access(),
                         errmsg("could not remove directory \"%s\"", path)));
                continue;
            }
            fsync_fname("pg_replslot", true);
            continue;
        }

        /* looks like a slot in a normal state, restore */
        RestoreSlotFromDisk(replication_de->d_name);
    }
    FreeDir(replication_dir);

    /* currently no slots exist, we're done. */
    if (max_replication_slots <= 0)
        return;

    /* Now that we have recovered all the data, compute replication xmin */
    ReplicationSlotsComputeRequiredXmin(false);
    ReplicationSlotsComputeRequiredLSN();
}

/* ----
 * Manipulation of on-disk state of replication slots
 *
 * NB: none of the routines below should take any notice whether a slot is the
 * current one or not, that's all handled a layer above.
 * ----
 */
static void
CreateSlotOnDisk(ReplicationSlot *slot)
{// #lizard forgives
    char        tmppath[MAXPGPATH];
    char        path[MAXPGPATH];
    struct stat st;

    /*
     * No need to take out the io_in_progress_lock, nobody else can see this
     * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
     * takes out the lock, if we'd take the lock here, we'd deadlock.
     */

    sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
    sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));

    /*
     * It's just barely possible that some previous effort to create or drop a
     * slot with this name left a temp directory lying around. If that seems
     * to be the case, try to remove it.  If the rmtree() fails, we'll error
     * out at the mkdir() below, so we don't bother checking success.
     */
    if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
        rmtree(tmppath, true);

    /* Create and fsync the temporary slot directory. */
    if (mkdir(tmppath, S_IRWXU) < 0)
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not create directory \"%s\": %m",
                        tmppath)));
    fsync_fname(tmppath, true);

#ifdef _PUB_SUB_RELIABLE_
    if (stat(path, &st) == 0 && S_ISDIR(st.st_mode))
        rmtree(path, true);
#endif

    /* Write the actual state file. */
    slot->dirty = true;            /* signal that we really need to write */
    SaveSlotToPath(slot, tmppath, ERROR);

    /* Rename the directory into place. */
    if (rename(tmppath, path) != 0)
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not rename file \"%s\" to \"%s\": %m",
                        tmppath, path)));

    /*
     * If we'd now fail - really unlikely - we wouldn't know whether this slot
     * would persist after an OS crash or not - so, force a restart. The
     * restart would try to fsync this again till it works.
     */
    START_CRIT_SECTION();

    fsync_fname(path, true);
    fsync_fname("pg_replslot", true);

    END_CRIT_SECTION();
}

/*
 * Shared functionality between saving and creating a replication slot.
 */
static void
SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
{// #lizard forgives
    char        tmppath[MAXPGPATH];
    char        path[MAXPGPATH];
    int            fd;
    ReplicationSlotOnDisk cp;
    bool        was_dirty;

    /* first check whether there's something to write out */
    SpinLockAcquire(&slot->mutex);
    was_dirty = slot->dirty;
    slot->just_dirtied = false;
    SpinLockRelease(&slot->mutex);

    /* and don't do anything if there's nothing to write */
    if (!was_dirty)
        return;

    LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);

    /* silence valgrind :( */
    memset(&cp, 0, sizeof(ReplicationSlotOnDisk));

    sprintf(tmppath, "%s/state.tmp", dir);
    sprintf(path, "%s/state", dir);

    fd = OpenTransientFile(tmppath,
                           O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
                           S_IRUSR | S_IWUSR);
    if (fd < 0)
    {
#ifdef __OPENTENBASE__
        LWLockRelease(&slot->io_in_progress_lock);
#endif
        ereport(elevel,
                (errcode_for_file_access(),
                 errmsg("could not create file \"%s\": %m",
                        tmppath)));
        return;
    }

    cp.magic = SLOT_MAGIC;
    INIT_CRC32C(cp.checksum);
    cp.version = SLOT_VERSION;
    cp.length = ReplicationSlotOnDiskV2Size;

    SpinLockAcquire(&slot->mutex);

    memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));

    SpinLockRelease(&slot->mutex);

    COMP_CRC32C(cp.checksum,
                (char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
                SnapBuildOnDiskChecksummedSize);
    FIN_CRC32C(cp.checksum);

    pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
    if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
    {
        int            save_errno = errno;

        pgstat_report_wait_end();
        CloseTransientFile(fd);
        errno = save_errno;

#ifdef __OPENTENBASE__
        LWLockRelease(&slot->io_in_progress_lock);
#endif        
        ereport(elevel,
                (errcode_for_file_access(),
                 errmsg("could not write to file \"%s\": %m",
                        tmppath)));
        return;
    }
    pgstat_report_wait_end();

    /* fsync the temporary file */
    pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
    if (pg_fsync(fd) != 0)
    {
        int            save_errno = errno;

        pgstat_report_wait_end();
        CloseTransientFile(fd);
        errno = save_errno;
#ifdef __OPENTENBASE__
        LWLockRelease(&slot->io_in_progress_lock);
#endif
        ereport(elevel,
                (errcode_for_file_access(),
                 errmsg("could not fsync file \"%s\": %m",
                        tmppath)));
        return;
    }
    pgstat_report_wait_end();

    CloseTransientFile(fd);

    /* rename to permanent file, fsync file and directory */
    if (rename(tmppath, path) != 0)
    {
#ifdef __OPENTENBASE__
        LWLockRelease(&slot->io_in_progress_lock);
#endif
        ereport(elevel,
                (errcode_for_file_access(),
                 errmsg("could not rename file \"%s\" to \"%s\": %m",
                        tmppath, path)));
        return;
    }

    /* Check CreateSlot() for the reasoning of using a crit. section. */
    START_CRIT_SECTION();

    fsync_fname(path, false);
    fsync_fname(dir, true);
    fsync_fname("pg_replslot", true);

    END_CRIT_SECTION();

    /*
     * Successfully wrote, unset dirty bit, unless somebody dirtied again
     * already.
     */
    SpinLockAcquire(&slot->mutex);
    if (!slot->just_dirtied)
        slot->dirty = false;
    SpinLockRelease(&slot->mutex);

    LWLockRelease(&slot->io_in_progress_lock);
}

/*
 * Load a single slot from disk into memory.
 */
static void
RestoreSlotFromDisk(const char *name)
{// #lizard forgives
    ReplicationSlotOnDisk cp;
    char        path[MAXPGPATH + 22];
    int            fd;
    int            readBytes;
    pg_crc32c    checksum;
    ReplicationSlot *slot;

    /* no need to lock here, no concurrent access allowed yet */

    /* delete temp file if it exists */
    sprintf(path, "pg_replslot/%s/state.tmp", name);
    if (unlink(path) < 0 && errno != ENOENT)
        ereport(PANIC,
                (errcode_for_file_access(),
                 errmsg("could not remove file \"%s\": %m", path)));

    sprintf(path, "pg_replslot/%s/state", name);

    elog(DEBUG1, "restoring replication slot from \"%s\"", path);

    fd = OpenTransientFile(path, O_RDWR | PG_BINARY, 0);

    /*
     * We do not need to handle this as we are rename()ing the directory into
     * place only after we fsync()ed the state file.
     */
    if (fd < 0)
        ereport(PANIC,
                (errcode_for_file_access(),
                 errmsg("could not open file \"%s\": %m", path)));

    /*
     * Sync state file before we're reading from it. We might have crashed
     * while it wasn't synced yet and we shouldn't continue on that basis.
     */
    pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
    if (pg_fsync(fd) != 0)
    {
        CloseTransientFile(fd);
        ereport(PANIC,
                (errcode_for_file_access(),
                 errmsg("could not fsync file \"%s\": %m",
                        path)));
    }
    pgstat_report_wait_end();

    /* Also sync the parent directory */
    START_CRIT_SECTION();
    fsync_fname(path, true);
    END_CRIT_SECTION();

    /* read part of statefile that's guaranteed to be version independent */
    pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
    pgstat_report_wait_end();
    if (readBytes != ReplicationSlotOnDiskConstantSize)
    {
        int            saved_errno = errno;

        CloseTransientFile(fd);
        errno = saved_errno;
        ereport(PANIC,
                (errcode_for_file_access(),
                 errmsg("could not read file \"%s\", read %d of %u: %m",
                        path, readBytes,
                        (uint32) ReplicationSlotOnDiskConstantSize)));
    }

    /* verify magic */
    if (cp.magic != SLOT_MAGIC)
        ereport(PANIC,
                (errcode_for_file_access(),
                 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
                        path, cp.magic, SLOT_MAGIC)));

    /* verify version */
    if (cp.version != SLOT_VERSION)
        ereport(PANIC,
                (errcode_for_file_access(),
                 errmsg("replication slot file \"%s\" has unsupported version %u",
                        path, cp.version)));

    /* boundary check on length */
    if (cp.length != ReplicationSlotOnDiskV2Size)
        ereport(PANIC,
                (errcode_for_file_access(),
                 errmsg("replication slot file \"%s\" has corrupted length %u",
                        path, cp.length)));

    /* Now that we know the size, read the entire file */
    pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    readBytes = read(fd,
                     (char *) &cp + ReplicationSlotOnDiskConstantSize,
                     cp.length);
    pgstat_report_wait_end();
    if (readBytes != cp.length)
    {
        int            saved_errno = errno;

        CloseTransientFile(fd);
        errno = saved_errno;
        ereport(PANIC,
                (errcode_for_file_access(),
                 errmsg("could not read file \"%s\", read %d of %u: %m",
                        path, readBytes, cp.length)));
    }

    CloseTransientFile(fd);

    /* now verify the CRC */
    INIT_CRC32C(checksum);
    COMP_CRC32C(checksum,
                (char *) &cp + SnapBuildOnDiskNotChecksummedSize,
                SnapBuildOnDiskChecksummedSize);
    FIN_CRC32C(checksum);

    if (!EQ_CRC32C(checksum, cp.checksum))
        ereport(PANIC,
                (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
                        path, checksum, cp.checksum)));

    /*
     * If we crashed with an ephemeral slot active, don't restore but delete
     * it.
     */
    if (cp.slotdata.persistency != RS_PERSISTENT)
    {
        sprintf(path, "pg_replslot/%s", name);

        if (!rmtree(path, true))
        {
            ereport(WARNING,
                    (errcode_for_file_access(),
                     errmsg("could not remove directory \"%s\"", path)));
        }
        fsync_fname("pg_replslot", true);
        return;
    }
    
#ifdef _PUB_SUB_RELIABLE_
    /* nothing can be active yet, don't lock anything */
    if (cp.slotdata.slotid <= REPLICATION_SLOT_INVALID_ID || cp.slotdata.slotid >= max_replication_slots)
    {
        elog(ERROR, "RestoreSlotFromDisk: invalid slot id %d", cp.slotdata.slotid);
    }
    
    slot = &ReplicationSlotCtl->replication_slots[cp.slotdata.slotid];

    elog(LOG, "slot info in restoring, info in file, slotid:%d, slot_name: %s; "
              "info in slot slotid:%d, active_pid:%d, slot_name: %s, in_use: %d",
         cp.slotdata.slotid,
         NameStr(cp.slotdata.name),
         slot->data.slotid,
         slot->active_pid,
         NameStr(slot->data.name),
         slot->in_use);
    if (slot->in_use)
    {
        elog(PANIC, "slot info conflict in restoring, info in file, slotid:%d; info in slot slotid:%d, active_pid:%d",
            cp.slotdata.slotid,
            slot->data.slotid,
            slot->active_pid);
    }

    /* restore the entire set of persistent data */
    memcpy(&slot->data, &cp.slotdata, sizeof(ReplicationSlotPersistentData));

    /* initialize in memory state */
    slot->effective_xmin            = cp.slotdata.xmin;
    slot->effective_catalog_xmin    = cp.slotdata.catalog_xmin;

    slot->candidate_catalog_xmin    = InvalidTransactionId;
    slot->candidate_xmin_lsn        = InvalidXLogRecPtr;
    slot->candidate_restart_lsn     = InvalidXLogRecPtr;
    slot->candidate_restart_valid   = InvalidXLogRecPtr;

    slot->in_use                    = true;
    slot->active_pid                = 0;
#ifdef __OPENTENBASE__
	slot->pgoutput = false;
	slot->subid = InvalidOid;
	slot->relid = InvalidOid;
	slot->shards = NULL;
	slot->npubs = 0;
	slot->alltables = NULL;
	slot->pubshards = NULL;
	slot->tables = NULL;
	slot->ntups_insert = 0;
	slot->ntups_delete = 0;
	slot->checksum_insert = 0;
	slot->checksum_delete = 0;
#endif

    return;
#endif
}

#ifdef _PUB_SUB_RELIABLE_
void replica_slot_wal_record_lsn_replic(void)
{
    xl_replication_slot_lsn_replica xlrec;

    if (RS_PERSISTENT != MyReplicationSlot->data.persistency)
    {
        /* skip TEMPORARY slot */
        return;
    }

    SpinLockAcquire(&MyReplicationSlot->mutex);
    
    xlrec.slotid            = MyReplicationSlot->data.slotid;
    xlrec.xmin                = MyReplicationSlot->data.xmin;
    xlrec.catalog_xmin        = MyReplicationSlot->data.catalog_xmin;
    xlrec.restart_lsn        = MyReplicationSlot->data.restart_lsn;
    xlrec.confirmed_flush    = MyReplicationSlot->data.confirmed_flush;

    SpinLockRelease(&MyReplicationSlot->mutex);
    if (g_replication_slot_debug)
    {
        elog(LOG, "replication_slot_wal_record_lsn_replica info in xlrec, slot id:%d, xmin:%u, catalog_xmin:%u, "
                "restart_lsn:%X/%X, confirmed_flush:%X/%X",
                xlrec.slotid,
                xlrec.xmin,
                xlrec.catalog_xmin,
                (uint32) (xlrec.restart_lsn >> 32), (uint32) xlrec.restart_lsn,
                (uint32) (xlrec.confirmed_flush >> 32), (uint32) xlrec.confirmed_flush);
    }
    XLogBeginInsert();
    XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
    XLogInsert(RM_RELICATION_SLOT_ID, XLOG_REPLORIGIN_SLOT_LSN_REPLICA);
    
    return;
}

void replica_slot_wal_record_creat_slot(void)
{
    xl_replication_slot_create xlrec;

    if (RS_PERSISTENT != MyReplicationSlot->data.persistency)
    {
        /* skip TEMPORARY slot */
        return;
    }
    
    xlrec.slotid = MyReplicationSlot->data.slotid;
    
    /* in ReplicationSlotPersistentData */
    StrNCpy(NameStr(xlrec.slotname), NameStr(MyReplicationSlot->data.name), NAMEDATALEN);
    xlrec.database              = MyReplicationSlot->data.database;
    xlrec.persistency           = MyReplicationSlot->data.persistency;
    xlrec.xmin                  = MyReplicationSlot->data.xmin;
    xlrec.catalog_xmin          = MyReplicationSlot->data.catalog_xmin;
    xlrec.restart_lsn           = MyReplicationSlot->data.restart_lsn;
    xlrec.confirmed_flush       = MyReplicationSlot->data.confirmed_flush;
    StrNCpy(NameStr(xlrec.pluginname), NameStr(MyReplicationSlot->data.plugin), NAMEDATALEN);

    /* in ReplicationSlot */
    xlrec.effective_xmin        = MyReplicationSlot->effective_xmin;
    xlrec.effective_catalog_xmin= MyReplicationSlot->effective_catalog_xmin;
    xlrec.pgoutput              = MyReplicationSlot->pgoutput;
    xlrec.subid                 = MyReplicationSlot->subid;
    StrNCpy(NameStr(xlrec.subname), NameStr(MyReplicationSlot->subname), NAMEDATALEN);
    xlrec.relid                 = MyReplicationSlot->relid;

    elog(LOG, "replication_slot_wal_record_create_slot info in xlrec, slotid:%d, name:%s, database:%u, persistency:%d, xmin:%u, catalog_xmin:%u, "
            "restart_lsn:%X/%X, confirmed_flush:%X/%X, plugin:%s, effective_xmin:%u, pgoutput:%d, subid:%u, relid:%u ", 
            xlrec.slotid, 
            NameStr(xlrec.slotname), 
            xlrec.database,
            xlrec.persistency, xlrec.xmin, xlrec.catalog_xmin,
            (uint32) (xlrec.restart_lsn >> 32), (uint32) xlrec.restart_lsn,
            (uint32) (xlrec.confirmed_flush >> 32), (uint32) xlrec.confirmed_flush,
            NameStr(xlrec.pluginname),
            xlrec.effective_xmin, xlrec.pgoutput, xlrec.subid, xlrec.relid);


    /* assemble and insert */
    XLogBeginInsert();
    XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
    XLogInsert(RM_RELICATION_SLOT_ID, XLOG_REPLORIGIN_SLOT_CREATE);
    
    return;
}

void replication_slot_wal_record_drop(void)
{
    xl_replication_slot_drop xlrec;
    
    if (RS_PERSISTENT != MyReplicationSlot->data.persistency)
    {
        /* skip TEMPORARY slot */
        return;
    }

    xlrec.slotid = MyReplicationSlot->data.slotid;
    StrNCpy(NameStr(xlrec.slotname), NameStr(MyReplicationSlot->data.name), NAMEDATALEN);

    elog(LOG, "replication_slot_wal_record_drop info in xlrec, slotid:%d, name:%s", 
            xlrec.slotid, 
            NameStr(xlrec.slotname));
    
    /* assemble and insert */
    XLogBeginInsert();
    XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
    XLogInsert(RM_RELICATION_SLOT_ID, XLOG_REPLORIGIN_SLOT_DROP);

    return;
}

void replication_slot_wal_record_rename_slot(const char * new_name)
{
    xl_replication_slot_rename xlrec;

    if (RS_PERSISTENT != MyReplicationSlot->data.persistency)
    {
        /* skip TEMPORARY slot */
        return;
    }

    xlrec.slotid = MyReplicationSlot->data.slotid;
    StrNCpy(NameStr(xlrec.old_slotname), NameStr(MyReplicationSlot->data.name), NAMEDATALEN);
    StrNCpy(NameStr(xlrec.new_slotname), new_name, NAMEDATALEN);

    elog(LOG, "replication_slot_wal_record_rename_slot info in xlrec, slotid:%d, old_slot_name:%s, new_slot_name:%s",
         xlrec.slotid,
         NameStr(xlrec.old_slotname),
         NameStr(xlrec.new_slotname));

    /* assemble and insert */
    XLogBeginInsert();
    XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
    XLogInsert(RM_RELICATION_SLOT_ID, XLOG_REPLORIGIN_SLOT_RENAME);

    return;
}

void replication_slot_redo(XLogReaderState *record)
{
    uint8        info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;

    switch (info & XLR_RMGR_INFO_MASK)
    {
        case XLOG_REPLORIGIN_SLOT_LSN_REPLICA:
        {
            replication_slot_redo_lsn_update((xl_replication_slot_lsn_replica *) XLogRecGetData(record));

            break;
        }
        case XLOG_REPLORIGIN_SLOT_CREATE:
        {
            replication_slot_redo_create((xl_replication_slot_create*)XLogRecGetData(record));
            
            break;
        }
        case XLOG_REPLORIGIN_SLOT_DROP:
        {
            replication_slot_redo_drop((xl_replication_slot_drop*)XLogRecGetData(record));
            
            break;
        }
        case XLOG_REPLORIGIN_SLOT_RENAME:
        {
            replication_slot_redo_rename((xl_replication_slot_rename*)XLogRecGetData(record));

            break;
        }
        default:
            elog(PANIC, "replication_slot_redo: unknown op code %u", info);
            break;
    }
    
    return;
}

static void replication_slot_redo_lsn_update(xl_replication_slot_lsn_replica * xlrec)
{// #lizard forgives
    ReplicationSlot*slot;
    bool            do_update;
    
    if (xlrec->slotid <= REPLICATION_SLOT_INVALID_ID ||xlrec->slotid >= max_replication_slots)
    {
        elog(PANIC, "replication_slot_lsn_update: get invalid slotid %d", xlrec->slotid);
    }

    slot = &ReplicationSlotCtl->replication_slots[xlrec->slotid];

    /* 
     * cause in redo procedure, slot should be not used, we just update status, 
     * and it would be marked used when start replication 
     */
    if (slot->data.slotid != xlrec->slotid)
    {
        elog(LOG, "replication_slot_lsn_update: slot(id:%d) in wal, in_use:%d, slotid in data:%d", 
            xlrec->slotid, slot->in_use, slot->data.slotid);
        return;
    }
    if (g_replication_slot_debug)
    {
        elog(LOG, "replication_slot_redo_lsn_update info in slot, slot id:%d, xmin:%u, catalog_xmin:%u, "
                "restart_lsn:%X/%X, confirmed_flush:%X/%X",
                slot->data.slotid,
                slot->data.xmin,
                slot->data.catalog_xmin,
                (uint32) (slot->data.restart_lsn >> 32), (uint32) slot->data.restart_lsn,
                (uint32) (slot->data.confirmed_flush >> 32), (uint32) slot->data.confirmed_flush);
    }
#if 0
    elog(LOG, "replication_slot_redo_lsn_update info in xlrec, slot id:%d, xmin:%u, catalog_xmin:%u, "
                "restart_lsn:%X/%X, confirmed_flush:%X/%X",
                xlrec->slotid,
                xlrec->xmin,
                xlrec->catalog_xmin,
                (uint32) (xlrec->restart_lsn >> 32), (uint32) xlrec->restart_lsn,
                (uint32) (xlrec->confirmed_flush >> 32), (uint32) xlrec->confirmed_flush);
#endif    
    do_update = false;
    /* update running status */
    if (InvalidTransactionId == slot->data.xmin || TransactionIdPrecedes(slot->data.xmin, xlrec->xmin))
    {
        slot->data.xmin  = xlrec->xmin;
        do_update = true;
    }
    if (InvalidTransactionId == slot->data.catalog_xmin || TransactionIdPrecedes(slot->data.catalog_xmin, xlrec->catalog_xmin))
    {
        slot->data.catalog_xmin  = xlrec->catalog_xmin;
        do_update = true;
    }
    if (slot->data.restart_lsn < xlrec->restart_lsn)
    {
        slot->data.restart_lsn  = xlrec->restart_lsn;
        do_update = true;
    }
    if (slot->data.confirmed_flush < xlrec->confirmed_flush)
    {
        slot->data.confirmed_flush  = xlrec->confirmed_flush;
        do_update = true;
    }

    if (do_update)
    {
        slot->effective_catalog_xmin = slot->data.catalog_xmin;

        ReplicationSlotsComputeRequiredXmin(false);
        ReplicationSlotsComputeRequiredLSN();
        
        /* 
         * let checkpoint to flush disc.
         * there is no need to SpinLockAcquire(&slot->mutex), cause startup process do this job itself.
         */
        slot->just_dirtied  = true;
        slot->dirty         = true;
    }
    
    return;
}

static void replication_slot_redo_create(xl_replication_slot_create * xlrec)
{// #lizard forgives
    ReplicationSlot*slot;
    
    if (xlrec->slotid <= REPLICATION_SLOT_INVALID_ID ||xlrec->slotid >= max_replication_slots)
    {
        elog(PANIC, "replication_slot_lsn_update: get invalid slotid %d", xlrec->slotid);
    }

    slot = &ReplicationSlotCtl->replication_slots[xlrec->slotid];

    if ((0 != slot->data.slotid) && (strncmp(NameStr(slot->data.name), "", NAMEDATALEN) != 0))
    {
        elog(LOG, "replication_slot_redo_create info in slot, slotid:%d, name:%s, database:%u, persistency:%d, xmin:%u, catalog_xmin:%u, "
            "restart_lsn:%X/%X, confirmed_flush:%X/%X, plugin:%s, in_use:%d, effective_xmin:%u, just_dirtied:%d, dirty:%d, pgoutput:%d, subid:%u, relid:%u ", 
            slot->data.slotid, 
            NameStr(slot->data.name), 
            slot->data.database,
            slot->data.persistency, slot->data.xmin, slot->data.catalog_xmin,
            (uint32) (slot->data.restart_lsn >> 32), (uint32) slot->data.restart_lsn,
            (uint32) (slot->data.confirmed_flush >> 32), (uint32) slot->data.confirmed_flush,
            NameStr(slot->data.plugin),
            slot->in_use,
            slot->effective_xmin, slot->just_dirtied, slot->dirty, slot->pgoutput, slot->subid, slot->relid);
    }
    else
    {
        elog(LOG, "replication_slot_redo_create info in slot, slotid:%d, database:%u, persistency:%d, xmin:%u, catalog_xmin:%u, "
            "restart_lsn:%X/%X, confirmed_flush:%X/%X, in_use:%d, effective_xmin:%u, just_dirtied:%d, dirty:%d, pgoutput:%d, subid:%u, relid:%u ", 
            slot->data.slotid, 
            slot->data.database,
            slot->data.persistency, slot->data.xmin, slot->data.catalog_xmin,
            (uint32) (slot->data.restart_lsn >> 32), (uint32) slot->data.restart_lsn,
            (uint32) (slot->data.confirmed_flush >> 32), (uint32) slot->data.confirmed_flush,
            slot->in_use,
            slot->effective_xmin, slot->just_dirtied, slot->dirty, slot->pgoutput, slot->subid, slot->relid);
    }
#if 0
    elog(LOG, "replication_slot_redo_create info in xlrec, slotid:%d, name:%s, database:%u, persistency:%d, xmin:%u, catalog_xmin:%u, "
            "restart_lsn:%X/%X, confirmed_flush:%X/%X, plugin:%s, effective_xmin:%u, pgoutput:%d, subid:%u, relid:%u ", 
            xlrec->slotid, 
            NameStr(xlrec->slotname), 
            xlrec->database,
            xlrec->persistency, xlrec->xmin, xlrec->catalog_xmin,
            (uint32) (xlrec->restart_lsn >> 32), (uint32) xlrec->restart_lsn,
            (uint32) (xlrec->confirmed_flush >> 32), (uint32) xlrec->confirmed_flush,
            NameStr(xlrec->pluginname),
            xlrec->effective_xmin, xlrec->pgoutput, xlrec->subid, xlrec->relid);
#endif 
    /* first initialize persistent data */
    memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
    /* and then data only present in shared memory */
    slot->just_dirtied = false;
    slot->dirty = false;
    slot->effective_xmin = InvalidTransactionId;
    slot->effective_catalog_xmin = InvalidTransactionId;
    slot->candidate_catalog_xmin = InvalidTransactionId;
    slot->candidate_xmin_lsn = InvalidXLogRecPtr;
    slot->candidate_restart_valid = InvalidXLogRecPtr;
    slot->candidate_restart_lsn = InvalidXLogRecPtr;
    slot->pgoutput = false;
    slot->subid = InvalidOid;
    slot->relid = InvalidOid;
    slot->shards = NULL;
    slot->npubs = 0;
    slot->alltables = NULL;
    slot->pubshards = NULL;
    slot->tables = NULL;
    slot->ntups_insert = 0;
    slot->ntups_delete = 0;
    slot->checksum_insert = 0;
    slot->checksum_delete = 0;

    /* 
     * update running status 
     * slot->data.confirmed_flush is 0, it's an initial state, this is a redo on standby node,
     * while, if slot->data.slotid and xlrec->slotid are both valid, it maybe this node is in recovery,
     * RestoreSlotFromDisk would has been done, xlog redo is later, so status in ReplicationSlotPersistentData maybe out of date,
     * so, update action would be up to compare result.
     */
    if (0 == slot->data.confirmed_flush || slot->data.confirmed_flush < xlrec->confirmed_flush)
    {
        slot->data.slotid           = xlrec->slotid;
        StrNCpy(NameStr(slot->data.name),   NameStr(xlrec->slotname),    NAMEDATALEN);
        slot->data.database         = xlrec->database;
        slot->data.persistency      = xlrec->persistency;
        if (InvalidTransactionId == slot->data.xmin || TransactionIdPrecedes(slot->data.xmin, xlrec->xmin))
        {
            slot->data.xmin             = xlrec->xmin;
        }
        if (InvalidTransactionId == slot->data.catalog_xmin || TransactionIdPrecedes(slot->data.catalog_xmin, xlrec->catalog_xmin))
        {
            slot->data.catalog_xmin     = xlrec->catalog_xmin;
        }
        if (slot->data.restart_lsn < xlrec->restart_lsn)
        {
            slot->data.restart_lsn      = xlrec->restart_lsn;
        }
        if (slot->data.confirmed_flush < xlrec->confirmed_flush)
        {
            slot->data.confirmed_flush  = xlrec->confirmed_flush;
        }
        StrNCpy(NameStr(slot->data.plugin), NameStr(xlrec->pluginname),  NAMEDATALEN);
    }
    
    /* in ReplicationSlot */
    if (InvalidTransactionId == slot->effective_xmin || TransactionIdPrecedes(slot->effective_xmin, xlrec->effective_xmin))
    {
        slot->effective_xmin        = xlrec->effective_xmin;
        slot->just_dirtied          = true;
        slot->dirty                 = true;
    }
    if (InvalidTransactionId == slot->effective_catalog_xmin || TransactionIdPrecedes(slot->effective_catalog_xmin, xlrec->effective_catalog_xmin))
    {
        slot->effective_catalog_xmin= xlrec->effective_catalog_xmin;
        slot->just_dirtied          = true;
        slot->dirty                 = true;
    }
    slot->pgoutput                  = xlrec->pgoutput;
    if (InvalidOid == slot->subid && InvalidOid != xlrec->subid)
    {
        slot->subid                 = xlrec->subid;
        StrNCpy(NameStr(slot->subname), NameStr(xlrec->subname), NAMEDATALEN);
    }
    if (InvalidOid == slot->relid && InvalidOid != slot->relid)
    {
        slot->relid                 = xlrec->relid;
    }
    
    slot->in_use = true;

    ReplicationSlotsComputeRequiredLSN();
    ReplicationSlotsComputeRequiredXmin(false);

    CreateSlotOnDisk(slot);

    return;
}

static void replication_slot_redo_drop(xl_replication_slot_drop * xlrec)
{
    ReplicationSlot*slot;
    
    if (xlrec->slotid <= REPLICATION_SLOT_INVALID_ID ||xlrec->slotid >= max_replication_slots)
    {
        elog(PANIC, "replication_slot_lsn_update: get invalid slotid %d", xlrec->slotid);
    }

    slot = &ReplicationSlotCtl->replication_slots[xlrec->slotid];

    /* 
     * if slot is not ours, skip 
     */
    if (slot->data.slotid != xlrec->slotid)
    {
        elog(LOG, "replication_slot_redo_drop: slot(id:%d) in wal, in_use:%d, slotid in data:%d", 
            xlrec->slotid, slot->in_use, slot->data.slotid);
        return;
    }

    if (0 != slot->data.slotid && strncmp(NameStr(slot->data.name), "", NAMEDATALEN) !=0 )
    {
        elog(LOG, "replication_slot_redo_drop info in slot, slotid:%d, name:%s, database:%u, persistency:%d, xmin:%u, catalog_xmin:%u, "
            "restart_lsn:%X/%X, confirmed_flush:%X/%X, plugin:%s, in_use:%d, effective_xmin:%u, just_dirtied:%d, dirty:%d, pgoutput:%d, subid:%u, relid:%u ", 
            slot->data.slotid, 
            NameStr(slot->data.name), 
            slot->data.database,
            slot->data.persistency, slot->data.xmin, slot->data.catalog_xmin,
            (uint32) (slot->data.restart_lsn >> 32), (uint32) slot->data.restart_lsn,
            (uint32) (slot->data.confirmed_flush >> 32), (uint32) slot->data.confirmed_flush,
            NameStr(slot->data.plugin),
            slot->in_use,
            slot->effective_xmin, slot->just_dirtied, slot->dirty, slot->pgoutput, slot->subid, slot->relid);
    }
    else
    {
        elog(LOG, "replication_slot_redo_drop info in slot, slotid:%d, database:%u, persistency:%d, xmin:%u, catalog_xmin:%u, "
            "restart_lsn:%X/%X, confirmed_flush:%X/%X, in_use:%d, effective_xmin:%u, just_dirtied:%d, dirty:%d, pgoutput:%d, subid:%u, relid:%u ", 
            slot->data.slotid, 
            slot->data.database,
            slot->data.persistency, slot->data.xmin, slot->data.catalog_xmin,
            (uint32) (slot->data.restart_lsn >> 32), (uint32) slot->data.restart_lsn,
            (uint32) (slot->data.confirmed_flush >> 32), (uint32) slot->data.confirmed_flush,
            slot->in_use,
            slot->effective_xmin, slot->just_dirtied, slot->dirty, slot->pgoutput, slot->subid, slot->relid);
    }
#if 0
    elog(LOG, "replication_slot_redo_drop info in xlrec, slotid:%d, name:%s ",
            xlrec->slotid,  NameStr(xlrec->slotname));
#endif
    
    /* remove file */
    ReplicationSlotDropPtr(slot);

    slot->data.slotid       = REPLICATION_SLOT_INVALID_ID;

    return;
}

static void replication_slot_redo_rename(xl_replication_slot_rename * xlrec)
{
    ReplicationSlot *slot;

    if (xlrec->slotid <= REPLICATION_SLOT_INVALID_ID ||xlrec->slotid >= max_replication_slots)
    {
        elog(PANIC, "replication_slot_redo_rename: get invalid slotid %d", xlrec->slotid);
    }

    slot = &ReplicationSlotCtl->replication_slots[xlrec->slotid];

    /*
     * if slot is not ours, skip
     */
    if (slot->data.slotid != xlrec->slotid)
    {
        elog(LOG, "replication_slot_redo_rename: slot(id:%d) in wal, in_use:%d, slotid in data:%d",
             xlrec->slotid, slot->in_use, slot->data.slotid);
        return;
    }

    if (0 != slot->data.slotid && strncmp(NameStr(slot->data.name), "", NAMEDATALEN) !=0 )
    {
        elog(LOG, "replication_slot_redo_rename info in slot, slotid:%d, name:%s, database:%u, persistency:%d, xmin:%u, catalog_xmin:%u, "
                  "restart_lsn:%X/%X, confirmed_flush:%X/%X, plugin:%s, in_use:%d, effective_xmin:%u, just_dirtied:%d, dirty:%d, pgoutput:%d, subid:%u, relid:%u ",
             slot->data.slotid,
             NameStr(slot->data.name),
             slot->data.database,
             slot->data.persistency, slot->data.xmin, slot->data.catalog_xmin,
             (uint32) (slot->data.restart_lsn >> 32), (uint32) slot->data.restart_lsn,
             (uint32) (slot->data.confirmed_flush >> 32), (uint32) slot->data.confirmed_flush,
             NameStr(slot->data.plugin),
             slot->in_use,
             slot->effective_xmin, slot->just_dirtied, slot->dirty, slot->pgoutput, slot->subid, slot->relid);
    }
    else
    {
        elog(LOG, "replication_slot_redo_rename info in slot, slotid:%d, database:%u, persistency:%d, xmin:%u, catalog_xmin:%u, "
                  "restart_lsn:%X/%X, confirmed_flush:%X/%X, in_use:%d, effective_xmin:%u, just_dirtied:%d, dirty:%d, pgoutput:%d, subid:%u, relid:%u ",
             slot->data.slotid,
             slot->data.database,
             slot->data.persistency, slot->data.xmin, slot->data.catalog_xmin,
             (uint32) (slot->data.restart_lsn >> 32), (uint32) slot->data.restart_lsn,
             (uint32) (slot->data.confirmed_flush >> 32), (uint32) slot->data.confirmed_flush,
             slot->in_use,
             slot->effective_xmin, slot->just_dirtied, slot->dirty, slot->pgoutput, slot->subid, slot->relid);
    }
    elog(LOG, "replication_slot_redo_rename info in xlrec, slotid:%d, old_name:%s, new_name:%s ",
            xlrec->slotid,  NameStr(xlrec->old_slotname), NameStr(xlrec->new_slotname));

    if (strncmp(NameStr(slot->data.name), NameStr(xlrec->old_slotname), NAMEDATALEN) != 0)
        elog(LOG, "replication_slot_redo_rename old_name in xlog:%s is inconsistency with slot.name:%s", NameStr(xlrec->old_slotname), NameStr(slot->data.name));

    if (strncmp(NameStr(slot->data.name), NameStr(xlrec->new_slotname), NAMEDATALEN) ==0)
        return;

    /* rename slot */
    ReplicationSlotRenamePtr(slot, NameStr(xlrec->new_slotname));

    return;
}

#endif
